package com.my.service.task.sink;

import com.my.service.task.entity.BrandLike;
import com.my.service.task.util.MongoUtils;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.bson.Document;

public class BrandLikeSink implements SinkFunction<BrandLike> {
    @Override
    public void invoke(BrandLike value, Context context) throws Exception {
        String brand = value.getBrand();
        long count = value.getCount();
        Document res = MongoUtils.findOneBy("brandlikestatics",
                "realtimeportrait",
                brand
        );
        if (res == null) {
            res = new Document();
            res.put("info", brand);
            res.put("count", count);
        } else {
            Long countPre = res.getLong("count");
            Long total = countPre + count;
            res.put("count", total);
        }
        MongoUtils.saveOrUpdateMongo(
                "brandlikestatics",
                "realtimeportrait",
                res
        );
    }
}
